
package com.gcloud.mesh.framework.core.mq;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.gcloud.mesh.framework.core.mq.processor.MsgProcessors;
import com.gcloud.mesh.framework.core.mq.role.MyRole;
import com.gcloud.mesh.framework.core.thread.ThreadsManager;
import com.gcloud.mesh.header.msg.MqBaseMsg;
import com.gcloud.mesh.header.msg.MqBaseReplyMsg;
import com.gcloud.mesh.header.vo.RoleVo;

import lombok.extern.slf4j.Slf4j;

@Configuration
@Slf4j
@ConditionalOnProperty(name = "mesh.mq.enable", havingValue = "true")
public class RabbitMqConfig {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Autowired
    private MyRole myRole;

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(myRole.roleType());
    }

    @Bean("queue_role")
    Queue queueRole() {
        return new Queue(myRole.roleType(), true, false, true);
    }

    @Bean("queue_role_id")
    Queue queueRoleId() {
        return new Queue(myRole.roleId(), true, true, true);
    }

    @Bean
    Binding bindingRole(DirectExchange exchange, @Qualifier("queue_role") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(queue.getName());
    }

    @Bean
    Binding bindingRoleId(DirectExchange exchange, @Qualifier("queue_role_id") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(queue.getName());
    }

    @RabbitListener(queues = {"#{myRole.roleType()}", "#{myRole.roleId()}"})
    @RabbitHandler
    void receive(Message message) {
        try {
            String action = this.getHeaderProp(message, "action");
            String msgId = this.getHeaderProp(message, "msgId");
            String isReply = this.getHeaderProp(message, "reply");
            String async = this.getHeaderProp(message, "async");
            String role = this.getHeaderProp(message, "role");
            String roleId = this.getHeaderProp(message, "roleId");

            if (MsgProcessors.getMsgClass(action) == null) {
                return;
            }
            MqBaseMsg msg = (MqBaseMsg)MqMsgSerializer.deserialize(message.getBody(), MsgProcessors.getMsgClass(action));
            msg.setMsgId(msgId);
            msg.setMsgFrom(new RoleVo(role, roleId));

            // 如果是server端同步请求的返回
            if (StringUtils.equals(isReply, "true")) {
                SyncReplyMsgs.addReply(msg);
                return;
            }

            ThreadsManager.submit(new Runnable() {

                @Override
                public void run() {
                    MqBaseMsg reply;
                    try {
                        reply = MsgProcessors.process(action, msg);
                    }
                    catch (Throwable e) {
                        reply = new MqBaseReplyMsg(false, e.getMessage());
                    }

                    // 来自agent端的同步情况，需要返回结果
                    if (StringUtils.equals(async, "false")) {
                        try {
                            MessageProperties resProps = new MessageProperties();
                            resProps.setHeader("action", action + "Reply");
                            resProps.setHeader("msgId", msgId);
                            resProps.setHeader("reply", true);
                            resProps.setHeader("async", true);
                            resProps.setHeader("role", role);
                            resProps.setHeader("roleId", roleId);
                            Message res = new Message(MqMsgSerializer.serialize(reply), resProps);
                            amqpTemplate.convertAndSend(role, roleId, res);
                        }
                        catch (Throwable e) {
                            log.error("send reply msg failed: ", e);
                        }
                    }
                }
            });

        }
        catch (Throwable e) {
            log.error("mq receive msg failed: ", e);
        }
    }

    private String getHeaderProp(Message message, String key) {
        Object obj = message.getMessageProperties().getHeaders().get(key);
        return obj == null ? null : obj.toString();
    }

}
